kafka connector 向mysql写数据时 如何指定key和value的schema?

您所在的位置:网站首页 kafka sink mysql kafka connector 向mysql写数据时 如何指定key和value的schema?

kafka connector 向mysql写数据时 如何指定key和value的schema?

#kafka connector 向mysql写数据时 如何指定key和value的schema?| 来源: 网络整理| 查看: 265

恰巧,本人最近在做一个这个方面的专题研究,我来说说自己对这你这个问题的看法。

首先,你是从kafka 的topic 把数据流向 mysql,要用的Kafka Connector应该是Sink类型的。目前,Confluent 3.3已经有Kafka的JDBC Connector,可以完成这个事情。

第二,如果你从Conflent的官网下载了Confluent (本回答发生时,为3.3 版本,分为社区版和企业版,下载哪个版本,要看你自己情况),里面自带了 jdbc的Connector,这个Connector包含了两个部分,一个是Source类型的(就是通过jdbc把数据导入到Kafka的),另外一个是Sink类型的(就是把Kafka的topic数据直接导入到jdbc连接的数据库的)。

第三,从你的问题知道,你应该已经接触了Kafka Connector了,关于怎么安装、启动服务、启动Connector我就不赘述了,你应该已经知道了。我要说的一点,就是,你通过设置 一个 xxx.properties文件,这个文件的内容大致如下:

name=test-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=orders connection.url=jdbc:mysql://mysqlhost:port/dbname?name=xxx&password=xxxx auto.create=true

具体字段意义很明了,然后通过启动Connector的命令启动,我从官网上摘下来这个命令你给参考下:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-mysql.properties(这个文件就是上面定义的xxx.properties)

这样,你Kafka里面的 orders 的topic的内容,就会在你指定的mysql数据库中创建一个orders表,关于你说的Schema,并不需要指定。

另外,需要注意的一点,当你用上面的命令启动一个connector的时候,

etc/schema-registry/connect-avro-standalone.properties

这个参数配置文件中,会启动一个restful服务,这个服务默认端口是8083,如果你在一台主机同时启动多个connector的时候,需要修改配置文件中的端口号。

现在从另外一个角度来回答你标题中的问题:如果你用 avro-console-productor的时候,是要指定Schema的,但是用connector的时候,不需要这么做。

不知道我的回答是否能解决你的问题,也欢迎大家拍砖!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3